feat: add executor pool support#687
Conversation
27790b7 to
6694105
Compare
wgtmac
left a comment
There was a problem hiding this comment.
Nice work. The abstraction is clean and the test coverage is solid. Since the default executor is nullopt, existing paths stay single-threaded, so the risk is well contained.
A couple of things before merge:
- The PR has no description. Worth adding the motivation and a short design note, ideally with the parallel/serial numbers that justify it.
- The
RetryRunnerchange from a runtime fluent API (OnlyRetryOn/StopRetryOn) to the compile-timeRetryRunner<Policy>is a breaking API change that's independent of executor support. Consider splitting it into its own PR so it gets reviewed on its own merits.
Left a few inline notes.
wgtmac
left a comment
There was a problem hiding this comment.
A few more minor notes, non-blocking.
wgtmac
left a comment
There was a problem hiding this comment.
Another pass, this time on the interface design and extensibility.
The overall shape is good: one virtual Executor that engines implement, threaded through the builders via PlanWith. The Arrow adapter test is a nice proof that an external pool drops in with basically one line, so "bring your own threadpool" is well covered.
On future async directions (C++23 coroutines, P2300 std::execution): the model here is synchronous and blocking, TaskGroup::Run() fans out and blocks on std::future::get(), and the planning APIs return Result<...> directly. So this is a parallel-for primitive, not a step toward a sender/receiver pipeline. That's a reasonable scope for now, I'd just flag it explicitly so nobody expects this interface to extend into async later, it'll be a separate one. Details inline.
wgtmac
left a comment
There was a problem hiding this comment.
One more pass. The main thing I found is a documentation gap around a new concurrency contract that now leaks onto user-supplied callbacks.
Once an executor is set, the user's ManifestWriterFactory and the shared FileIO get called from multiple worker threads. The tests already account for this (the factories use an atomic counter plus a barrier), so the requirement is understood, it's just not written down anywhere a downstream engine would see it. Worth documenting on the public surface.
| */ | ||
|
|
||
| // Borrowed the file from Apache Arrow: | ||
| // https://github.com/apache/arrow/blob/main/cpp/src/arrow/util/functional.h |
Rebases onto apache#687's executor pool support per @wgtmac's review: * FileCleanupStrategy now takes an OptionalExecutor in its constructor. When the custom DeleteWith() callback is configured, per-path deletes fan out through a TaskGroup that uses the executor (or runs serially when none is set, preserving prior behavior). * The FileIO bulk delete path wraps file_io_->DeleteFiles in a tight RetryRunner<retry::StopRetryOn<kNotFound>> so transient FileIO errors no longer give up after the first attempt. Mirrors Java's Tasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). * Adds ExpireSnapshots::Executor(OptionalExecutor) so callers can opt in to parallel deletion, and threads it down through both IncrementalFileCleanup and ReachableFileCleanup. * Drops the std::async / std::thread / std::span machinery and the ad-hoc retry loop -- replaced by util/task_group.h and util/retry_util.h from apache#687. Adds ExecutorDispatchesDeletesConcurrently test that wires a test::ThreadExecutor through the new API and asserts the executor received one submission per file.
Rebases onto apache#687's executor pool support per @wgtmac's review: * FileCleanupStrategy now takes an OptionalExecutor in its constructor. When the custom DeleteWith() callback is configured, per-path deletes fan out through a TaskGroup that uses the executor (or runs serially when none is set, preserving prior behavior). * The FileIO bulk delete path wraps file_io_->DeleteFiles in a tight RetryRunner<retry::StopRetryOn<kNotFound>> so transient FileIO errors no longer give up after the first attempt. Mirrors Java's Tasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). * Adds ExpireSnapshots::Executor(OptionalExecutor) so callers can opt in to parallel deletion, and threads it down through both IncrementalFileCleanup and ReachableFileCleanup. * Drops the std::async / std::thread / std::span machinery and the ad-hoc retry loop -- replaced by util/task_group.h and util/retry_util.h from apache#687. Adds ExecutorDispatchesDeletesConcurrently test that wires a test::ThreadExecutor through the new API and asserts the executor received one submission per file.
Rebases onto apache#687's executor pool support per @wgtmac's review: * FileCleanupStrategy now takes an OptionalExecutor in its constructor. When the custom DeleteWith() callback is configured, per-path deletes fan out through a TaskGroup that uses the executor (or runs serially when none is set, preserving prior behavior). * The FileIO bulk delete path wraps file_io_->DeleteFiles in a tight RetryRunner<retry::StopRetryOn<kNotFound>> so transient FileIO errors no longer give up after the first attempt. Mirrors Java's Tasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N). * Adds ExpireSnapshots::Executor(OptionalExecutor) so callers can opt in to parallel deletion, and threads it down through both IncrementalFileCleanup and ReachableFileCleanup. * Drops the std::async / std::thread / std::span machinery and the ad-hoc retry loop -- replaced by util/task_group.h and util/retry_util.h from apache#687. Adds ExecutorDispatchesDeletesConcurrently test that wires a test::ThreadExecutor through the new API and asserts the executor received one submission per file.
…649) ### What * `FileCleanupStrategy` now takes an `OptionalExecutor` in its constructor. When `DeleteWith()` is configured, per-path deletes fan out through `TaskGroup<>` -- uses the supplied executor when set, otherwise runs the callbacks serially on the calling thread via `RunTasksSingleThreaded`. Preserves prior single-threaded behavior by default. * The `FileIO::DeleteFiles` bulk path is wrapped in `RetryRunner<retry::StopRetryOn<ErrorKind::kNotFound>>` with a tight budget (2 retries, 100ms-1s backoff, 5s total). Mirrors Java's `Tasks.foreach(...).stopRetryOn(NotFoundException.class).retry(N)`. The retry primarily helps atomic-bulk FileIO impls (e.g. an S3 `DeleteObjects`-backed FileIO) ride out transient throttles -- best-effort, see code comment for the fail-fast-iterative caveat. * New public builder `ExpireSnapshots::ExecuteDeleteWith(OptionalExecutor)` (named after Java's `executeDeleteWith(ExecutorService)`) so callers opt in to parallel deletion; threaded through both `IncrementalFileCleanup` and `ReachableFileCleanup`. * `DeleteWith()` doc note clarifies that the user-supplied callback may be invoked concurrently from worker threads and must be thread-safe. * Drops the `std::async` / `std::thread` / `std::span` machinery and the ad-hoc retry loop -- replaced by `util/task_group.h` and `util/retry_util.h` from #687. ### Test coverage Existing 25 `ExpireSnapshots*` tests continue to pass unchanged (they hit the default no-executor path). Adds `ExpireSnapshotsCleanupTest.ExecutorDispatchesDeletesConcurrently` -- wires a `test::ThreadExecutor` through `ExecuteDeleteWith()`, runs an `ExpireSnapshotId` cleanup with a custom `DeleteWith` callback, and asserts the executor received one submission per file (data + manifest + manifest-list = 3) and that all paths show up in the mutex-guarded sink.
No description provided.